package com.yzq.os.spider.v.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

import com.yzq.os.spider.v.Constants;
import com.yzq.os.spider.v.domain.ListPageConfig;
import com.yzq.os.spider.v.domain.LogRecord;
import com.yzq.os.spider.v.domain.QueryURL;
import com.yzq.os.spider.v.domain.SearchEngine;
import com.yzq.os.spider.v.service.domain.SpiderRecordService;
import com.yzq.os.spider.v.service.domain.ListPageConfigService;
import com.yzq.os.spider.v.service.domain.LogRecordService;
import com.yzq.os.spider.v.service.domain.QueryURLService;
import com.yzq.os.spider.v.service.domain.SearchEngineParamService;
import com.yzq.os.spider.v.service.domain.SearchEngineService;
import com.yzq.os.spider.v.service.domain.ServerService;
import com.yzq.os.spider.v.service.http.HttpClientService;
import com.yzq.os.spider.v.service.spider.BeforeCrawlProcessor;
import com.yzq.os.spider.v.service.spider.CompletionCrawl;
import com.yzq.os.spider.v.service.spider.SpiderTask;
import com.yzq.os.spider.v.util.EncodeUtil;

/**
 * ְλץȡ���ȿ����࣬���𴴽�ץȡ�̳߳أ���С������߳����������г��ȣ�������ÿ�λ�ȡURL������ÿ��ץȡ�߳̽�ִ�н���д�����ȿ�������
 * ���ȿ����������û��ṩץȡ�����Ҫͳ�ƽӿڷ�����
 * 
 * @author DEV-MARTIN
 * 
 */
@Service
@Scope("prototype")
public class CrawlService extends Thread {

	private static Logger logger = Logger.getLogger(CrawlService.class);

	/**
	 * ץȡ���ȿ�������ʼִ�е�ʱ��map
	 */
	private static Map<Integer, Date> START_DATE_MAP = new ConcurrentHashMap<Integer, Date>();

	/**
	 * �Ѿ�ץȡ��ְλ������map
	 */
	private static Map<Integer, AtomicInteger> CRAWLED_JOB_COUNT_MAP = new ConcurrentHashMap<Integer, AtomicInteger>();

	/**
	 * �Ѿ��ύ����URL��������map
	 */
	private static Map<Integer, AtomicInteger> QUERYED_URL_COUNT_MAP = new ConcurrentHashMap<Integer, AtomicInteger>();

	/**
	 * ��ǰ����ѭ������û�л�ȡ������URL�������̳߳���û��������ܴ���ÿ���������µ�����URL���������ó�0.
	 */
	private int currWaitBreakTime = 0;

	/**
	 * ����ѭ������û�л�ȡ������URL�������̳߳���û������������������������ѭ�����˳���
	 */
	private static final int MAX_WAIT_BREAK_TIME = 100;

	/**
	 * ����ѭ������û�л�ȡ������URL�������̳߳���û�����񣬵ȴ��´ε��ȵ�ʱ������
	 */
	private static final int WAIT_BREAK_INTERVAL_SECOND = 6;

	@Autowired
	private SearchEngineService searchEngineService;

	@Autowired
	private SearchEngineParamService searchEngineParamService;

	@Autowired
	private QueryURLService queryURLService;

	@Autowired
	private ListPageConfigService listPageConfigService;

	@Autowired
	private SpiderRecordService crawJobService;

	@Autowired
	private ServerService serverService;

	@Autowired
	private LogRecordService logRecordService;

	/**
	 * Ҫץȡ�������������
	 */
	private SearchEngine searchEngine;

	/**
	 * Ҫץȡ��ְλ�б����ö���
	 */
	private ListPageConfig listPageConfig;

	/**
	 * ִ��ץȡ������̳߳ء�
	 */
	private ThreadPoolExecutor executor;

	/**
	 * ץȡ��Ŀ����������ID
	 */
	private int searchEngineId;

	/**
	 * ��ݱ���ǵġ�ץȡ���ڡ�
	 */
	private Date markDate;

	/**
	 * ����ץȡְλ��ݵı���
	 */
	private String jobSaveTableName;

	/**
	 * ÿ�ε��Ȼ�ȡ����URL��������
	 */
	private int findUrlSize;

	/**
	 * ��ʼ��ץȡ�̳߳ض����趨��������г��ȵ����񣬽��С�������ִ�С���
	 * 
	 * @param minimumPoolSize
	 *            ��С�߳�����
	 * @param maximumPoolSize
	 *            ����߳�����
	 * @param arrayQueueSize
	 *            ���г���
	 */
	public void initializeThreadPoolExecutor(int minimumPoolSize, int maximumPoolSize, int arrayQueueSize) {
		this.executor = new ThreadPoolExecutor(minimumPoolSize, maximumPoolSize, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(arrayQueueSize), new ThreadPoolExecutor.CallerRunsPolicy());
	}

	/**
	 * ָ��ץȡ�����Ƿ��ִ�еı�־
	 */
	private boolean runnable = true;

	/**
	 * ��ݲ�ȫʵ����
	 */
	private CompletionCrawl completionCrawl;

	/**
	 * ץȡ���ȷ���
	 */
	@SuppressWarnings("unchecked")
	@Override
	public void run() {
		// ����ݿ��мӡ����������桱����ְλ�б����ö��󡱡�
		// ���������ְλ�б����ñ��޸ĺ�,��Ҫ�ȵ���һ��ץȡ��������ʱ����Ч.
		searchEngine = searchEngineService.findById(searchEngineId);
		listPageConfig = listPageConfigService.findBySearchEngineId(searchEngineId);
		Date startDate = new Date();
		String searchEngineName = searchEngine.getName();
		// ��Spring�������м���HTTP�������
		ApplicationContext context = Constants.getApplicationContext();
		HttpClientService httpClientService = context.getBean("httpClientService", HttpClientService.class);

		logger.info(EncodeUtil.gbk2iso("BEGIN CRAWL ENGINE:[" + searchEngineName + "] SAVE TABLE NAME:[" + jobSaveTableName + "] AT:[" + DateFormatUtils.format(startDate, "yyyy-MM-dd") + "]"));

		// ����ץȡǰ�ô�������ǰ�ô�������һ���ӿڣ������û��Զ���ʵ�֣��磺��վ��Ҫ�ȵ�¼�ſ�������ְλ����ô����ǰ�ô�������ʵ�ֵ�¼����
		BeforeCrawlProcessor processor = null;
		try {
			processor = makeBeforeCrawlProcessor(httpClientService);
			if (processor != null) {
				logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "] start before crawl processor."));
				boolean result = processor.process();
				if (!result) {
					// ���ǰ�ô���ʧ�ܣ�����ִ��ץȡ���ȣ�ֱ�ӷ��ء�
					logger.error("Before crawl processor ERROR!");
					return;
				} else {
					logger.info("Before crawl processor SUCCESS!");
				}
				logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "] end before crawl processor."));
			}
		} catch (ClassNotFoundException e1) {
			logger.error("", e1);
			return;
		}

		// ִ��ץȡ�����߼���
		try {
			// ����ץȡ�������е�ͳ����Ϣ��
			resetSpeedStatis(searchEngineId, startDate, new AtomicInteger(0), new AtomicInteger(0));
			// ѭ��ץȡ���ȿ��ơ�
			List<QueryURL> queryUrls = null;
			while (runnable) {
				// ÿ�λ�ȡһ�������ġ���ץȡ����URL��
				queryUrls = queryURLService.findWillDoWithTryTime(searchEngineId, findUrlSize);
				if (CollectionUtils.isNotEmpty(queryUrls)) {
					// ����ҵ��ˡ���ץȡ����URL�����������˳�����ѭ����״̬��������������ץȡ���񣬷����̳߳�ִ�С�
					currWaitBreakTime = 0;
					for (QueryURL queryUrl : queryUrls) {
						SpiderTask task = makeCrawTask(httpClientService, queryUrl);
						executor.execute(task);
					}
				} else {
					// ���û���ҵ�����ץȡ����URL������ô�����жϵ�ǰ�̳߳��е�����ִ�������
					long activeTaskCount = executor.getActiveCount();
					int queueSize = executor.getQueue().size();
					if ((activeTaskCount != 0 || queueSize != 0)) {
						// ����̳߳����С�����ִ�е�ץȡ���񡱣����Ƕ������С��ȴ�ִ�е�ץȡ���񡰣����ȿ���ѭ������Ϣһ��ʱ���ٽ�����һ�ε��ȡ�.
						logger.info(EncodeUtil.gbk2iso(" [" + searchEngineName + "] not have query urls in database but have [" + activeTaskCount + "] active tasks in executor queue size:[" + queueSize + "] sleep five seconds."));
						TimeUnit.SECONDS.sleep(5);
					} else {
						// ʵ����ݲ�ȫʵ����,���û��ʵ����Ҳ�����˲�ȫʵ����
						if (serverService.isLocalMaster() && completionCrawl == null && StringUtils.isNotBlank(searchEngine.getCompletionSpiderClass())) {
							Class<CompletionCrawl> completionCrawlClazz = ClassUtils.getClass(searchEngine.getCompletionSpiderClass());
							completionCrawl = BeanUtils.instantiate(completionCrawlClazz);
							completionCrawl.setCrawlJobService(crawJobService);
							completionCrawl.setSearchEngine(searchEngine);
							completionCrawl.setSearchEngineId(searchEngineId);
							completionCrawl.setQueryURLService(queryURLService);
							completionCrawl.setMarkDate(markDate);
							completionCrawl.setJobSaveTableName(jobSaveTableName);
							// �������ѯURL���,ϵͳ���ڵȴ��˳�,���ᱻ����ִ��
							try {
								completionCrawl.doCompletion();
							} catch (Exception e) {
								logger.error("", e);
							}
							// ���³�ʼ���ȴ����
							currWaitBreakTime = 0;
						}
						// ����̳߳���û������ִ�е����񣬲��Ҷ�����Ҳ�ǿյģ���ôץȡ��������Ѿ���ɣ�Ϊ�˱�֤��������ɣ����ȿ��������ٳ���ִ��һ�������ĵ��ȶ�����ÿ�ζ������һ��ʱ�䡣
						if (currWaitBreakTime < MAX_WAIT_BREAK_TIME) {
							currWaitBreakTime++;
							logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "] wait break time:[" + currWaitBreakTime + "] and sleep:[" + WAIT_BREAK_INTERVAL_SECOND + "] second."));
							TimeUnit.SECONDS.sleep(WAIT_BREAK_INTERVAL_SECOND);
						} else {
							break;
						}
					}
				}
			}
		} catch (Exception e) {
			logger.error("", e);
		} finally {
			try {
				// �ر��̳߳���Դ,��ֹ����������
				executor.shutdown();
				logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "]  execute executor shutdown. "));
				// �ȴ��̳߳�������������ȫ����
				while (!executor.isTerminated()) {
				}
				logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "]  executor shutdown is termainated. "));
			} catch (Exception e) {
				logger.error("", e);
			} finally {
				try {
					// ����������ȫ�����ʱ��������ִ�еĸ�Ҫͳ����Ϣ��д�뵽������־���С�
					LogRecord record = createLogRecord();
					logRecordService.save(record);
					logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "]  create log record is[" + record + "] "));
				} catch (Exception e) {
					logger.error("", e);
				} finally {
					// ����Ҫͳ����Ϣ
					removeSpeedStatis(searchEngineId);
					logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "]  remove speed statis information."));
					// �Ƴ�ץȡ�����̣߳��Ա��´ο��������������̡߳�
					SpiderRecordService.removeRunning(searchEngineId);
					logger.info(EncodeUtil.gbk2iso("[" + searchEngineName + "]  remove running thread from crawl job service."));
				}
			}
		}
		logger.info(EncodeUtil.gbk2iso("END CRAWL ENGINE:[" + searchEngineName + "]"));
	}

	/**
	 * ���ץȡ��Ϣ����Ҫͳ����Ϣ�ȣ�����������־����
	 * 
	 * @return
	 */
	private LogRecord createLogRecord() {
		int searchEngineId = searchEngine.getId();
		LogRecord record = new LogRecord();
		record.setSpiderDate(markDate);
		record.setSearchEngineId(searchEngineId);
		record.setIp(serverService.getLocalIp());
		record.setQueryCount(takeQueryCount(searchEngineId));
		record.setQuerySpeed(intToLong(takeQuerySpeed(searchEngineId)));
		record.setRecordCount(takeJobCount(searchEngineId));
		record.setRecordSpeed(intToLong(takeJobSpeed(searchEngineId)));
		record.setStartTime(takeStartDate(searchEngineId));
		record.setFinishTime(new Date());
		return record;
	}

	/**
	 * ����ת��
	 * 
	 * @param value
	 * @return
	 */
	private int intToLong(long value) {
		return Integer.parseInt(Long.toString(value));
	}

	/**
	 * ͨ�����ȡ��ְλץȡʵ���ࡱ������ʵ����������ݣ��������߳����е�����
	 * 
	 * @param queryUrl
	 * @return
	 * @throws ClassNotFoundException
	 */
	@SuppressWarnings({ "unchecked" })
	private SpiderTask makeCrawTask(HttpClientService httpClientService, QueryURL queryUrl) throws ClassNotFoundException {
		Class<SpiderTask> crawClazz = ClassUtils.getClass(searchEngine.getSpiderTaskClass());
		SpiderTask task = BeanUtils.instantiate(crawClazz);
		task.setSearchEngineService(searchEngineService);
		task.setSearchEngineParamService(searchEngineParamService);
		task.setQueryURLService(queryURLService);
		task.setSpiderRecordService(crawJobService);
		task.setHttpClientService(httpClientService);
		task.setQueryURL(queryUrl);
		task.setSearchEngine(searchEngine);
		task.setListPageConfig(listPageConfig);
		task.setCrawlDate(markDate);
		task.setTableName(jobSaveTableName);
		return task;
	}

	/**
	 * ͨ�����ȡ��ץȡǰ�ô�����ʵ���ࡱ�����û�����ý�����null��
	 * 
	 * @return
	 * @throws ClassNotFoundException
	 */
	@SuppressWarnings({ "unchecked" })
	private BeforeCrawlProcessor makeBeforeCrawlProcessor(HttpClientService httpClientService) throws ClassNotFoundException {
		BeforeCrawlProcessor processor = null;
		String beforeCrawlProcClass = searchEngine.getBeforeSpiderProcClass();
		if (StringUtils.isNotBlank(beforeCrawlProcClass)) {
			Class<BeforeCrawlProcessor> crawClazz = ClassUtils.getClass(beforeCrawlProcClass);
			processor = BeanUtils.instantiate(crawClazz);
			processor.setHttpClientService(httpClientService);
		}
		return processor;
	}

	/**
	 * ֹͣץȡ���ȵ��ⲿ�ӿڷ��������ú󲻻�����ֹͣ����Ҫ������һ���������ڣ����ж���Ч��
	 */
	public void stopThread() {
		runnable = false;
		logger.info("Stop crawl thread searchEngineId:[" + searchEngineId + "]");
	}

	public boolean getRunnable() {
		return runnable;
	}

	/**
	 * ��������ץȡ��Ҫͳ����Ϣ��
	 * 
	 * @param searchEngineId
	 * @param crawlStartDate
	 * @param crawledQueryUrlCount
	 * @param crawledJobCount
	 */
	public static void resetSpeedStatis(int searchEngineId, Date crawlStartDate, AtomicInteger crawledQueryUrlCount, AtomicInteger crawledJobCount) {
		START_DATE_MAP.put(searchEngineId, crawlStartDate);
		logger.info("Reset task start date map [key:[" + searchEngineId + "],value:[" + crawlStartDate + "]]");
		QUERYED_URL_COUNT_MAP.put(searchEngineId, crawledQueryUrlCount);
		logger.info("Reset crawl query url count map [key:[" + searchEngineId + "],value:[" + crawledQueryUrlCount + "]]");
		CRAWLED_JOB_COUNT_MAP.put(searchEngineId, crawledJobCount);
		logger.info("Reset crawl jobs count map [key:[" + searchEngineId + "],value:[" + crawledJobCount + "]]");
	}

	/**
	 * �Ƴ�ץȡ��Ҫͳ����Ϣ��
	 * 
	 * @param searchEngineId
	 */
	private void removeSpeedStatis(int searchEngineId) {
		START_DATE_MAP.remove(searchEngineId);
		QUERYED_URL_COUNT_MAP.remove(searchEngineId);
		CRAWLED_JOB_COUNT_MAP.remove(searchEngineId);
	}

	/**
	 * �����Ѵ��������URL������
	 * 
	 * @param searchEngineId
	 * @param size
	 */
	public static void addQueryCount(int searchEngineId, int size) {
		AtomicInteger count = QUERYED_URL_COUNT_MAP.get(searchEngineId);
		if (count != null) {
			count.addAndGet(size);
		}
	}

	/**
	 * ������ץȡְλ������
	 * 
	 * @param searchEngineId
	 * @param size
	 */
	public static void addJobCount(int searchEngineId, int size) {
		AtomicInteger count = CRAWLED_JOB_COUNT_MAP.get(searchEngineId);
		if (count != null) {
			count.addAndGet(size);
		}
	}

	/**
	 * ����ץȡ���ȵĿ�ʼʱ�䡣
	 * 
	 * @param searchEngineId
	 * @return
	 */
	public static Date takeStartDate(int searchEngineId) {
		return START_DATE_MAP.get(searchEngineId);
	}

	/**
	 * �����Ѵ��������URL������
	 * 
	 * @param searchEngieneId
	 * @return
	 */
	public static int takeQueryCount(int searchEngieneId) {
		AtomicInteger count = QUERYED_URL_COUNT_MAP.get(searchEngieneId);
		return count != null ? count.intValue() : 0;
	}

	/**
	 * ������ץȡְλ������
	 * 
	 * @param searchEngieneId
	 * @return
	 */
	public static int takeJobCount(int searchEngieneId) {
		AtomicInteger count = CRAWLED_JOB_COUNT_MAP.get(searchEngieneId);
		return count != null ? count.intValue() : 0;
	}

	/**
	 * ����ƽ��ÿ��ץȡ����URL���ٶȣ���λ����/�룩��
	 * 
	 * @param searchEngineId
	 * @return
	 */
	public static long takeQuerySpeed(int searchEngineId) {
		Date startDate = START_DATE_MAP.get(searchEngineId);
		int count = takeQueryCount(searchEngineId);
		long speed = 0l;
		if (startDate != null) {
			long costSecondes = differenceSeconds(startDate, new Date());
			if (costSecondes != 0l) {
				speed = count / costSecondes;
			}
		}
		return speed;
	}

	/**
	 * ����ƽ��ÿ��ץȡְλ���ٶȣ���λ����/�룩��
	 * 
	 * @param searchEngineId
	 * @return
	 */
	public static long takeJobSpeed(int searchEngineId) {
		Date startDate = START_DATE_MAP.get(searchEngineId);
		int count = takeJobCount(searchEngineId);
		long speed = 0l;
		if (startDate != null) {
			long costSecondes = differenceSeconds(startDate, new Date());
			if (costSecondes != 0l) {
				speed = count / costSecondes;
			}
		}
		return speed;
	}

	public void setSearchEngineId(int searchEngineId) {
		this.searchEngineId = searchEngineId;
	}

	public void setMarkDate(Date markDate) {
		this.markDate = markDate;
	}

	public void setJobSaveTableName(String jobSaveTableName) {
		this.jobSaveTableName = jobSaveTableName;
	}

	public void setFindUrlSize(int findUrlSize) {
		this.findUrlSize = findUrlSize;
	}

	/**
	 * ��ȡ����ִ�е�ȫ����������ID,�����ID�������򡣣����������ȵĿ�ʼʱ�伯�ϣ����жϣ�
	 * 
	 * @return
	 */
	public static List<Integer> getRunningEngineId() {
		Set<Integer> keys = START_DATE_MAP.keySet();
		List<Integer> keysList = new ArrayList<Integer>(keys);
		if (CollectionUtils.isNotEmpty(keysList)) {
			Collections.sort(keysList);
		}
		return keysList;
	}

	/**
	 * ��������ʱ����������
	 * 
	 * @param oldDate
	 * @param newDate
	 * @return
	 */
	private static long differenceSeconds(Date oldDate, Date newDate) {
		return (newDate.getTime() - oldDate.getTime()) / 1000;
	}

}
